package com.wuwangfu.window.process;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Author: jcshen
 * @Date: 2023-03-09
 * @PackageName: com.wuwangfu.window.process
 * @ClassName: WindowProcessFunction
 * @Description:
 * @Version: 1.0.0
 * <p>
 * https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/#processwindowfunction
 */
public class WindowProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);

//        1000,hive,1
//        2000,hive,2
//        3000,spark,3
//        5000,hadoop,5
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<String> dataWatermark = lines.assignTimestampsAndWatermarks(WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0]))
        );
        //
        SingleOutputStreamOperator<Tuple2<String, Integer>> maped = dataWatermark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fileds = value.split(",");
                return Tuple2.of(fileds[1], Integer.parseInt(fileds[2]));
            }
        });
        //
        KeyedStream<Tuple2<String, Integer>, String> keyed = maped.keyBy(t -> t.f0);
        //
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        //
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
            /**
             *窗口触发，才会调用process方法，该方法可以获取窗口内的 全量获取窗口的数据，
             * 数据是缓存到windowstate中
             */
            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (Tuple2<String, Integer> iter : iterable) {
                    out.collect(iter);
                }
            }
        });

        result.print();

        env.execute();
    }
}
